/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.catalog;

import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.util.StringUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * A generic catalog implementation that holds all meta objects in memory.
 */
public class GenericInMemoryCatalog extends AbstractCatalog {

	public static final String DEFAULT_DB = "default";

	private final Map<String, CatalogDatabase> databases;
	private final Map<ObjectPath, CatalogBaseTable> tables;
	private final Map<ObjectPath, CatalogFunction> functions;
	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogPartition>> partitions;

	private final Map<ObjectPath, CatalogTableStatistics> tableStats;
	private final Map<ObjectPath, CatalogColumnStatistics> tableColumnStats;
	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogTableStatistics>> partitionStats;
	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogColumnStatistics>> partitionColumnStats;

	public GenericInMemoryCatalog(String name) {
		this(name, DEFAULT_DB);
	}

	public GenericInMemoryCatalog(String name, String defaultDatabase) {
		super(name, defaultDatabase);

		this.databases = new LinkedHashMap<>();
		this.databases.put(defaultDatabase, new CatalogDatabaseImpl(new HashMap<>(), null));
		this.tables = new LinkedHashMap<>();
		this.functions = new LinkedHashMap<>();
		this.partitions = new LinkedHashMap<>();
		this.tableStats = new LinkedHashMap<>();
		this.tableColumnStats = new LinkedHashMap<>();
		this.partitionStats = new LinkedHashMap<>();
		this.partitionColumnStats = new LinkedHashMap<>();
	}

	@Override
	public void open() {
	}

	@Override
	public void close() {

	}

	// ------ databases ------

	@Override
	public void createDatabase(String databaseName, CatalogDatabase db, boolean ignoreIfExists)
			throws DatabaseAlreadyExistException {
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
		checkNotNull(db);

		if (databaseExists(databaseName)) {
			if (!ignoreIfExists) {
				throw new DatabaseAlreadyExistException(getName(), databaseName);
			}
		} else {
			databases.put(databaseName, db.copy());
		}
	}

	@Override
	public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade)
			throws DatabaseNotExistException, DatabaseNotEmptyException {
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));

		if (databases.containsKey(databaseName)) {

			// Make sure the database is empty
			if (isDatabaseEmpty(databaseName)) {
				databases.remove(databaseName);
			} else if (cascade) {
				// delete all tables and functions in this database and then delete the database.
				List<ObjectPath> deleteTablePaths = tables.keySet().stream()
						.filter(op -> op.getDatabaseName().equals(databaseName)).collect(Collectors.toList());
				deleteTablePaths.forEach(objectPath -> {
						try {
							dropTable(objectPath, true);
						} catch (TableNotExistException e) {
							//ignore
						}
					});
				List<ObjectPath> deleteFunctionPaths = functions.keySet().stream()
						.filter(op -> op.getDatabaseName().equals(databaseName)).collect(Collectors.toList());
				deleteFunctionPaths.forEach(objectPath -> {
						try {
							dropFunction(objectPath, true);
						} catch (FunctionNotExistException e) {
							//ignore
						}
					});
				databases.remove(databaseName);
			} else {
				throw new DatabaseNotEmptyException(getName(), databaseName);
			}
		} else if (!ignoreIfNotExists) {
			throw new DatabaseNotExistException(getName(), databaseName);
		}
	}

	private boolean isDatabaseEmpty(String databaseName) {
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));

		return tables.keySet().stream().noneMatch(op -> op.getDatabaseName().equals(databaseName)) &&
			functions.keySet().stream().noneMatch(op -> op.getDatabaseName().equals(databaseName));
	}

	@Override
	public void alterDatabase(String databaseName, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
			throws DatabaseNotExistException {
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
		checkNotNull(newDatabase);

		CatalogDatabase existingDatabase = databases.get(databaseName);

		if (existingDatabase != null) {
			if (existingDatabase.getClass() != newDatabase.getClass()) {
				throw new CatalogException(
					String.format("Database types don't match. Existing database is '%s' and new database is '%s'.",
						existingDatabase.getClass().getName(), newDatabase.getClass().getName())
				);
			}

			databases.put(databaseName, newDatabase.copy());
		} else if (!ignoreIfNotExists) {
			throw new DatabaseNotExistException(getName(), databaseName);
		}
	}

	@Override
	public List<String> listDatabases() {
		return new ArrayList<>(databases.keySet());
	}

	@Override
	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException {
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));

		if (!databaseExists(databaseName)) {
			throw new DatabaseNotExistException(getName(), databaseName);
		} else {
			return databases.get(databaseName).copy();
		}
	}

	@Override
	public boolean databaseExists(String databaseName) {
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));

		return databases.containsKey(databaseName);
	}

	// ------ tables ------

	@Override
	public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
			throws TableAlreadyExistException, DatabaseNotExistException {
		checkNotNull(tablePath);
		checkNotNull(table);

		if (!databaseExists(tablePath.getDatabaseName())) {
			throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
		}

		if (tableExists(tablePath)) {
			if (!ignoreIfExists) {
				throw new TableAlreadyExistException(getName(), tablePath);
			}
		} else {
			tables.put(tablePath, table.copy());

			if (isPartitionedTable(tablePath)) {
				partitions.put(tablePath, new LinkedHashMap<>());
				partitionStats.put(tablePath, new LinkedHashMap<>());
				partitionColumnStats.put(tablePath, new LinkedHashMap<>());
			}
		}
	}

	@Override
	public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
			throws TableNotExistException {
		checkNotNull(tablePath);
		checkNotNull(newTable);

		CatalogBaseTable existingTable = tables.get(tablePath);

		if (existingTable != null) {
			if (existingTable.getClass() != newTable.getClass()) {
				throw new CatalogException(
					String.format("Table types don't match. Existing table is '%s' and new table is '%s'.",
						existingTable.getClass().getName(), newTable.getClass().getName()));
			}

			tables.put(tablePath, newTable.copy());
		} else if (!ignoreIfNotExists) {
			throw new TableNotExistException(getName(), tablePath);
		}
	}

	// ------ tables and views ------

	@Override
	public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException {
		checkNotNull(tablePath);

		if (tableExists(tablePath)) {
			tables.remove(tablePath);
			tableStats.remove(tablePath);
			tableColumnStats.remove(tablePath);

			partitions.remove(tablePath);
			partitionStats.remove(tablePath);
			partitionColumnStats.remove(tablePath);
		} else if (!ignoreIfNotExists) {
			throw new TableNotExistException(getName(), tablePath);
		}
	}

	@Override
	public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
			throws TableNotExistException, TableAlreadyExistException {
		checkNotNull(tablePath);
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(newTableName));

		if (tableExists(tablePath)) {
			ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);

			if (tableExists(newPath)) {
				throw new TableAlreadyExistException(getName(), newPath);
			} else {
				tables.put(newPath, tables.remove(tablePath));

				// table statistics
				if (tableStats.containsKey(tablePath)) {
					tableStats.put(newPath, tableStats.remove(tablePath));
				}

				// table column statistics
				if (tableColumnStats.containsKey(tablePath)) {
					tableColumnStats.put(newPath, tableColumnStats.remove(tablePath));
				}

				// partitions
				if (partitions.containsKey(tablePath)) {
					partitions.put(newPath, partitions.remove(tablePath));
				}

				// partition statistics
				if (partitionStats.containsKey(tablePath)) {
					partitionStats.put(newPath, partitionStats.remove(tablePath));
				}

				// partition column statistics
				if (partitionColumnStats.containsKey(tablePath)) {
					partitionColumnStats.put(newPath, partitionColumnStats.remove(tablePath));
				}
			}
		} else if (!ignoreIfNotExists) {
			throw new TableNotExistException(getName(), tablePath);
		}
	}

	@Override
	public List<String> listTables(String databaseName) throws DatabaseNotExistException {
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");

		if (!databaseExists(databaseName)) {
			throw new DatabaseNotExistException(getName(), databaseName);
		}

		return tables.keySet().stream()
			.filter(k -> k.getDatabaseName().equals(databaseName)).map(k -> k.getObjectName())
			.collect(Collectors.toList());
	}

	@Override
	public List<String> listViews(String databaseName) throws DatabaseNotExistException {
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");

		if (!databaseExists(databaseName)) {
			throw new DatabaseNotExistException(getName(), databaseName);
		}

		return tables.keySet().stream()
			.filter(k -> k.getDatabaseName().equals(databaseName))
			.filter(k -> (tables.get(k) instanceof CatalogView)).map(k -> k.getObjectName())
			.collect(Collectors.toList());
	}

	@Override
	public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException {
		checkNotNull(tablePath);

		if (!tableExists(tablePath)) {
			throw new TableNotExistException(getName(), tablePath);
		} else {
			return tables.get(tablePath).copy();
		}
	}

	@Override
	public boolean tableExists(ObjectPath tablePath) {
		checkNotNull(tablePath);

		return databaseExists(tablePath.getDatabaseName()) && tables.containsKey(tablePath);
	}

	private void ensureTableExists(ObjectPath tablePath) throws TableNotExistException {
		if (!tableExists(tablePath)) {
			throw new TableNotExistException(getName(), tablePath);
		}
	}

	// ------ functions ------

	@Override
	public void createFunction(ObjectPath path, CatalogFunction function, boolean ignoreIfExists)
			throws FunctionAlreadyExistException, DatabaseNotExistException {
		checkNotNull(path);
		checkNotNull(function);

		ObjectPath functionPath = normalize(path);

		if (!databaseExists(functionPath.getDatabaseName())) {
			throw new DatabaseNotExistException(getName(), functionPath.getDatabaseName());
		}

		if (functionExists(functionPath)) {
			if (!ignoreIfExists) {
				throw new FunctionAlreadyExistException(getName(), functionPath);
			}
		} else {
			functions.put(functionPath, function.copy());
		}
	}

	@Override
	public void alterFunction(ObjectPath path, CatalogFunction newFunction, boolean ignoreIfNotExists)
			throws FunctionNotExistException {
		checkNotNull(path);
		checkNotNull(newFunction);

		ObjectPath functionPath = normalize(path);

		CatalogFunction existingFunction = functions.get(functionPath);

		if (existingFunction != null) {
			if (existingFunction.getClass() != newFunction.getClass()) {
				throw new CatalogException(
					String.format("Function types don't match. Existing function is '%s' and new function is '%s'.",
						existingFunction.getClass().getName(), newFunction.getClass().getName())
				);
			}

			functions.put(functionPath, newFunction.copy());
		} else if (!ignoreIfNotExists) {
			throw new FunctionNotExistException(getName(), functionPath);
		}
	}

	@Override
	public void dropFunction(ObjectPath path, boolean ignoreIfNotExists) throws FunctionNotExistException {
		checkNotNull(path);

		ObjectPath functionPath = normalize(path);

		if (functionExists(functionPath)) {
			functions.remove(functionPath);
		} else if (!ignoreIfNotExists) {
			throw new FunctionNotExistException(getName(), functionPath);
		}
	}

	@Override
	public List<String> listFunctions(String databaseName) throws DatabaseNotExistException {
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");

		if (!databaseExists(databaseName)) {
			throw new DatabaseNotExistException(getName(), databaseName);
		}

		return functions.keySet().stream()
			.filter(k -> k.getDatabaseName().equals(databaseName))
			.map(k -> k.getObjectName())
			.collect(Collectors.toList());
	}

	@Override
	public CatalogFunction getFunction(ObjectPath path) throws FunctionNotExistException {
		checkNotNull(path);

		ObjectPath functionPath = normalize(path);

		if (!functionExists(functionPath)) {
			throw new FunctionNotExistException(getName(), functionPath);
		} else {
			return functions.get(functionPath).copy();
		}
	}

	@Override
	public boolean functionExists(ObjectPath path) {
		checkNotNull(path);

		ObjectPath functionPath = normalize(path);

		return databaseExists(functionPath.getDatabaseName()) && functions.containsKey(functionPath);
	}

	private ObjectPath normalize(ObjectPath path) {
		return new ObjectPath(path.getDatabaseName(), FunctionIdentifier.normalizeName(path.getObjectName()));
	}

	// ------ partitions ------

	@Override
	public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
		checkNotNull(tablePath);
		checkNotNull(partitionSpec);
		checkNotNull(partition);

		ensureTableExists(tablePath);
		ensurePartitionedTable(tablePath);
		ensureFullPartitionSpec(tablePath, partitionSpec);

		if (partitionExists(tablePath, partitionSpec)) {
			if (!ignoreIfExists) {
				throw new PartitionAlreadyExistsException(getName(), tablePath, partitionSpec);
			}
		}

		partitions.get(tablePath).put(partitionSpec, partition.copy());
	}

	@Override
	public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
			throws PartitionNotExistException, CatalogException {
		checkNotNull(tablePath);
		checkNotNull(partitionSpec);

		if (partitionExists(tablePath, partitionSpec)) {
			partitions.get(tablePath).remove(partitionSpec);
			partitionStats.get(tablePath).remove(partitionSpec);
			partitionColumnStats.get(tablePath).remove(partitionSpec);
		} else if (!ignoreIfNotExists) {
			throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
		}
	}

	@Override
	public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
			throws PartitionNotExistException, CatalogException {
		checkNotNull(tablePath);
		checkNotNull(partitionSpec);
		checkNotNull(newPartition);

		if (partitionExists(tablePath, partitionSpec)) {
			CatalogPartition existingPartition = partitions.get(tablePath).get(partitionSpec);

			if (existingPartition.getClass() != newPartition.getClass()) {
				throw new CatalogException(
					String.format("Partition types don't match. Existing partition is '%s' and new partition is '%s'.",
						existingPartition.getClass().getName(), newPartition.getClass().getName())
				);
			}

			partitions.get(tablePath).put(partitionSpec, newPartition.copy());
		} else if (!ignoreIfNotExists) {
			throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
		}
	}

	@Override
	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
			throws TableNotExistException, TableNotPartitionedException, CatalogException {
		checkNotNull(tablePath);

		ensureTableExists(tablePath);
		ensurePartitionedTable(tablePath);

		return new ArrayList<>(partitions.get(tablePath).keySet());
	}

	@Override
	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
			throws TableNotExistException, TableNotPartitionedException, CatalogException {
		checkNotNull(tablePath);
		checkNotNull(partitionSpec);

		ensurePartitionedTable(tablePath);

		CatalogTable catalogTable = (CatalogTable) getTable(tablePath);
		List<String> partKeys = catalogTable.getPartitionKeys();
		Map<String, String> spec = partitionSpec.getPartitionSpec();
		if (!partKeys.containsAll(spec.keySet())) {
			return new ArrayList<>();
		}

		return partitions.get(tablePath).keySet().stream()
			.filter(ps -> ps.getPartitionSpec().entrySet().containsAll(partitionSpec.getPartitionSpec().entrySet()))
			.collect(Collectors.toList());
	}

	@Override
	public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)
			throws TableNotExistException, TableNotPartitionedException, CatalogException {
		throw new UnsupportedOperationException();
	}

	@Override
	public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
			throws PartitionNotExistException, CatalogException {
		checkNotNull(tablePath);
		checkNotNull(partitionSpec);

		if (!partitionExists(tablePath, partitionSpec)) {
			throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
		}

		return partitions.get(tablePath).get(partitionSpec).copy();
	}

	@Override
	public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
			throws CatalogException {
		checkNotNull(tablePath);
		checkNotNull(partitionSpec);

		return partitions.containsKey(tablePath) && partitions.get(tablePath).containsKey(partitionSpec);
	}

	private void ensureFullPartitionSpec(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
		throws TableNotExistException, PartitionSpecInvalidException {
		if (!isFullPartitionSpec(tablePath, partitionSpec)) {
			throw new PartitionSpecInvalidException(getName(), ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
				tablePath, partitionSpec);
		}
	}

	/**
	 * Check if the given partitionSpec is full partition spec for the given table.
	 */
	private boolean isFullPartitionSpec(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException {
		CatalogBaseTable baseTable = getTable(tablePath);

		if (!(baseTable instanceof CatalogTable)) {
			return false;
		}

		CatalogTable table = (CatalogTable) baseTable;
		List<String> partitionKeys = table.getPartitionKeys();
		Map<String, String> spec = partitionSpec.getPartitionSpec();

		// The size of partition spec should not exceed the size of partition keys
		return partitionKeys.size() == spec.size() && spec.keySet().containsAll(partitionKeys);
	}

	private void ensurePartitionedTable(ObjectPath tablePath) throws TableNotPartitionedException {
		if (!isPartitionedTable(tablePath)) {
			throw new TableNotPartitionedException(getName(), tablePath);
		}
	}

	/**
	 * Check if the given table is a partitioned table.
	 * Note that "false" is returned if the table doesn't exists.
	 */
	private boolean isPartitionedTable(ObjectPath tablePath) {
		CatalogBaseTable table = null;
		try {
			table = getTable(tablePath);
		} catch (TableNotExistException e) {
			return false;
		}

		return (table instanceof CatalogTable) && ((CatalogTable) table).isPartitioned();
	}

	// ------ statistics ------

	@Override
	public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException {
		checkNotNull(tablePath);

		if (!tableExists(tablePath)) {
			throw new TableNotExistException(getName(), tablePath);
		}
		if (!isPartitionedTable(tablePath)) {
			CatalogTableStatistics result = tableStats.get(tablePath);
			return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
		} else {
			return CatalogTableStatistics.UNKNOWN;
		}
	}

	@Override
	public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException {
		checkNotNull(tablePath);

		if (!tableExists(tablePath)) {
			throw new TableNotExistException(getName(), tablePath);
		}

		CatalogColumnStatistics result = tableColumnStats.get(tablePath);
		return result != null ? result.copy() : CatalogColumnStatistics.UNKNOWN;
	}

	@Override
	public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
			throws PartitionNotExistException {
		checkNotNull(tablePath);
		checkNotNull(partitionSpec);

		if (!partitionExists(tablePath, partitionSpec)) {
			throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
		}

		CatalogTableStatistics result = partitionStats.get(tablePath).get(partitionSpec);
		return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
	}

	@Override
	public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
			throws PartitionNotExistException {
		checkNotNull(tablePath);
		checkNotNull(partitionSpec);

		if (!partitionExists(tablePath, partitionSpec)) {
			throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
		}

		CatalogColumnStatistics result = partitionColumnStats.get(tablePath).get(partitionSpec);
		return result != null ? result.copy() : CatalogColumnStatistics.UNKNOWN;
	}

	@Override
	public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
			throws TableNotExistException {
		checkNotNull(tablePath);
		checkNotNull(tableStatistics);

		if (tableExists(tablePath)) {
			tableStats.put(tablePath, tableStatistics.copy());
		} else if (!ignoreIfNotExists) {
			throw new TableNotExistException(getName(), tablePath);
		}
	}

	@Override
	public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics,
			boolean ignoreIfNotExists) throws TableNotExistException {
		checkNotNull(tablePath);
		checkNotNull(columnStatistics);

		if (tableExists(tablePath)) {
			tableColumnStats.put(tablePath, columnStatistics.copy());
		} else if (!ignoreIfNotExists) {
			throw new TableNotExistException(getName(), tablePath);
		}
	}

	@Override
	public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics,
			boolean ignoreIfNotExists) throws PartitionNotExistException {
		checkNotNull(tablePath);
		checkNotNull(partitionSpec);
		checkNotNull(partitionStatistics);

		if (partitionExists(tablePath, partitionSpec)) {
			partitionStats.get(tablePath).put(partitionSpec, partitionStatistics.copy());
		} else if (!ignoreIfNotExists) {
			throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
		}
	}

	@Override
	public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics,
			boolean ignoreIfNotExists) throws PartitionNotExistException {
		checkNotNull(tablePath);
		checkNotNull(partitionSpec);
		checkNotNull(columnStatistics);

		if (partitionExists(tablePath, partitionSpec)) {
			partitionColumnStats.get(tablePath).put(partitionSpec, columnStatistics.copy());
		} else if (!ignoreIfNotExists) {
			throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
		}
	}

}
